import 'dart:async';
import 'dart:typed_data';

import 'package:async/async.dart';

import '../common/test_page.dart';

class ChunkedStreamReaderTestPage extends TestPage {
  ChunkedStreamReaderTestPage(super.title) {
    test('readChunk()一块一块', () async {
      final r = ChunkedStreamReader(() async* {
        yield [1, 2];
        yield [3, 4, 5];
        yield [6, 7, 8, 9];
        yield [10];
      }());

      expect(await r.readChunk(2));
      expect(await r.readChunk(3));
      expect(await r.readChunk(4));
      expect(await r.readChunk(1));
      expect(await r.readChunk(1));
      expect(await r.readChunk(1));
      await r.cancel();
      expect(await r.readChunk(1));
    });

    test('readChunk()逐个元素', () async {
      final r = ChunkedStreamReader(() async* {
        yield [1, 2];
        yield [3, 4, 5];
        yield [6, 7, 8, 9];
        yield [10];
      }());

      for (var i = 0; i < 10; i++) {
        expect(await r.readChunk(1));
      }
      expect(await r.readChunk(1));
      expect(await r.readChunk(1));
      await r.cancel();
      expect(await r.readChunk(1));
    });

    test('readChunk()逐个元素', () async {
      final r = ChunkedStreamReader(() async* {
        yield [1, 2];
        yield [3, 4, 5];
        yield [6, 7, 8, 9];
        yield [10];
      }());

      expect(await r.readChunk(10));
      expect(await r.readChunk(1));
      expect(await r.readChunk(1));
      await r.cancel();
      expect(await r.readChunk(1));
    });

    test('readChunk()过期', () async {
      final r = ChunkedStreamReader(() async* {
        yield [1, 2];
        yield [3, 4, 5];
        yield [6, 7, 8, 9];
        yield [10];
      }());

      expect(await r.readChunk(20));
      expect(await r.readChunk(1));
      expect(await r.readChunk(1));
      await r.cancel();
      expect(await r.readChunk(1));
    });

    test('readChunk()由2个元素组成的块', () async {
      final r = ChunkedStreamReader(() async* {
        yield [1, 2];
        yield [3, 4, 5];
        yield [6, 7, 8, 9];
        yield [10];
      }());

      expect(await r.readChunk(2));
      expect(await r.readChunk(2));
      expect(await r.readChunk(2));
      expect(await r.readChunk(2));
      expect(await r.readChunk(2));
      expect(await r.readChunk(1));
      expect(await r.readChunk(1));
      await r.cancel();
      expect(await r.readChunk(1));
    });

    test('readChunk()由3个元素组成的块', () async {
      final r = ChunkedStreamReader(() async* {
        yield [1, 2];
        yield [3, 4, 5];
        yield [6, 7, 8, 9];
        yield [10];
      }());

      expect(await r.readChunk(3));
      expect(await r.readChunk(3));
      expect(await r.readChunk(3));
      expect(await r.readChunk(3));
      expect(await r.readChunk(1));
      expect(await r.readChunk(1));
      await r.cancel();
      expect(await r.readChunk(1));
    });

    test('readChunk()中途取消', () async {
      final r = ChunkedStreamReader(() async* {
        yield [1, 2];
        yield [3, 4, 5];
        yield [6, 7, 8, 9];
        yield [10];
      }());

      expect(await r.readChunk(5));
      await r.cancel();
      expect(await r.readChunk(1));
    });

    test('readChunk()传播异常', () async {
      final r = ChunkedStreamReader(() async* {
        yield [1, 2];
        yield [3, 4, 5];
        throw Exception('stopping here');
      }());

      expect(await r.readChunk(3));
      expect(r.readChunk(3));

      expect(await r.readChunk(1));
      await r.cancel();
      expect(await r.readChunk(1));
    });

    test('readStream()转发块', () async {
      final chunk2 = [3, 4, 5];
      final chunk3 = [6, 7, 8, 9];
      final r = ChunkedStreamReader(() async* {
        yield [1, 2];
        yield chunk2;
        yield chunk3;
        yield [10];
      }());

      expect(await r.readChunk(1));
      final i = StreamIterator(r.readStream(9));
      expect(await i.moveNext());
      expect(i.current);

      expect(await i.moveNext());
      expect(i.current);
      expect(i.current == chunk2);

      expect(await i.moveNext());
      expect(i.current);
      expect(i.current == chunk3);

      expect(await i.moveNext());
      expect(i.current);
      expect(await i.moveNext());

      expect(await r.readChunk(1));
      await r.cancel(); // check this is okay!
      expect(await r.readChunk(1));
    });

    test('readStream()最后取消', () async {
      final r = ChunkedStreamReader(() async* {
        yield [1, 2];
        yield [3, 4, 5];
        yield [6, 7, 8, 9];
        yield [10];
      }());

      expect(await r.readChunk(1));
      final i = StreamIterator(r.readStream(7));
      expect(await i.moveNext());
      expect(i.current);

      expect(await i.moveNext());
      expect(i.current);

      expect(await i.moveNext());
      expect(i.current);

      await i.cancel();

      expect(await r.readChunk(2));

      expect(await r.readChunk(1));
      await r.cancel(); // check this is okay!
      expect(await r.readChunk(1));
    });

    test('readStream()在区块边界的确切末端取消', () async {
      final r = ChunkedStreamReader(() async* {
        yield [1, 2];
        yield [3, 4, 5];
        yield [6, 7, 8, 9];
        yield [10];
      }());

      expect(await r.readChunk(1));
      final i = StreamIterator(r.readStream(8));
      expect(await i.moveNext());
      expect(i.current);

      expect(await i.moveNext());
      expect(i.current);

      expect(await i.moveNext());
      expect(i.current);

      await i.cancel();

      expect(await r.readChunk(2));

      expect(await r.readChunk(1));
      await r.cancel();
      expect(await r.readChunk(1));
    });

    test('readStream()取消时已耗尽', () async {
      final r = ChunkedStreamReader(() async* {
        yield [1, 2];
        yield [3, 4, 5];
        yield [6, 7, 8, 9];
        yield [10];
      }());

      expect(await r.readChunk(1));
      final i = StreamIterator(r.readStream(7));
      expect(await i.moveNext());
      expect(i.current);
      await i.cancel();

      expect(await r.readChunk(2));

      expect(await r.readChunk(1));
      await r.cancel();
      expect(await r.readChunk(1));
    });

    test('readStream()禁止并发读取', () async {
      final r = ChunkedStreamReader(() async* {
        yield [1, 2];
        yield [3, 4, 5];
        yield [6, 7, 8, 9];
        yield [10];
      }());

      expect(await r.readChunk(1));
      r.readStream(7);

      expect(await r.readChunk(2));
    });

    test('readStream()支撑排水', () async {
      final r = ChunkedStreamReader(() async* {
        yield [1, 2];
        yield [3, 4, 5];
        yield [6, 7, 8, 9];
        yield [10];
      }());

      expect(await r.readChunk(1));
      await r.readStream(7).drain();
      expect(await r.readChunk(2));

      expect(await r.readChunk(1));
      await r.cancel();
      expect(await r.readChunk(1));
    });

    test('嵌套ChunkedStreamReader()', () async {
      final r = ChunkedStreamReader(() async* {
        yield [1, 2];
        yield [3, 4, 5];
        yield [6, 7, 8, 9];
        yield [10];
      }());

      expect(await r.readChunk(1));
      final r2 = ChunkedStreamReader(r.readStream(7));
      expect(await r2.readChunk(2));
      expect(await r2.readChunk(1));
      await r2.cancel();

      expect(await r.readChunk(2));

      expect(await r.readChunk(1));
      await r.cancel();
      expect(await r.readChunk(1));
    });

    test('ChunkedStreamReader().readBytes()由3个元素组成的块', () async {
      final r = ChunkedStreamReader(() async* {
        yield [1, 2];
        yield [3, 4, 5];
        yield [6, 7, 8, 9];
        yield [10];
      }());

      expect(await r.readBytes(3));
      expect(await r.readBytes(3));
      expect(await r.readBytes(3));
      expect(await r.readBytes(3));
      expect(await r.readBytes(1));
      expect(await r.readBytes(1));
      await r.cancel();
      expect(await r.readBytes(1));
    });

    test('readChunk()直到流完全结束', () async {
      final stream = Stream.fromIterable(Iterable.generate(
        10,
            (_) => Uint8List(512),
      ));

      final r = ChunkedStreamReader(stream);
      while (true) {
        final c = await r.readBytes(1024);
        if (c.isEmpty) {
          expect(c);
          break;
        }
      }
    });

    test('在readChunk()挂起时取消', () async {
      final r = ChunkedStreamReader(() async* {
        yield [1, 2, 3];
        await Completer().future;
        yield [4];
        fail('unreachable!');
      }());

      expect(await r.readBytes(2));

      final future = r.readChunk(2);

      await Future.microtask(() => null);
      r.cancel();

      expect(await future);
    });

    test('在readStream()挂起时取消', () async {
      final r = ChunkedStreamReader(() async* {
        yield [1, 2, 3];
        await Completer().future;
        yield [4];
        fail('unreachable!');
      }());

      expect(await collectBytes(r.readStream(2)));

      final stream = r.readStream(2);

      await Future.microtask(() => null);
      r.cancel();

      expect(await collectBytes(stream));
    });

    test('readChunk() 一块一块 (Uint8List)', () async {
      final r = ChunkedStreamReader(() async* {
        yield Uint8List.fromList([1, 2]);
        yield Uint8List.fromList([3, 4, 5]);
        yield Uint8List.fromList([6, 7, 8, 9]);
        yield Uint8List.fromList([10]);
      }());

      expect(await r.readChunk(2));
      expect(await r.readChunk(3));
      expect(await r.readChunk(4));
      expect(await r.readChunk(1));
      expect(await r.readChunk(1));
      expect(await r.readChunk(1));
      await r.cancel();
      expect(await r.readChunk(1));
    });

    test('readChunk() 逐个元素 (Uint8List)', () async {
      final r = ChunkedStreamReader(() async* {
        yield Uint8List.fromList([1, 2]);
        yield Uint8List.fromList([3, 4, 5]);
        yield Uint8List.fromList([6, 7, 8, 9]);
        yield Uint8List.fromList([10]);
      }());

      for (var i = 0; i < 10; i++) {
        expect(await r.readChunk(1));
      }
      expect(await r.readChunk(1));
      expect(await r.readChunk(1));
      await r.cancel();
      expect(await r.readChunk(1));
    });

    test('readChunk() 精确元素 (Uint8List)', () async {
      final r = ChunkedStreamReader(() async* {
        yield Uint8List.fromList([1, 2]);
        yield Uint8List.fromList([3, 4, 5]);
        yield Uint8List.fromList([6, 7, 8, 9]);
        yield Uint8List.fromList([10]);
      }());

      expect(await r.readChunk(10));
      expect(await r.readChunk(1));
      expect(await r.readChunk(1));
      await r.cancel();
      expect(await r.readChunk(1));
    });

    test('readChunk() 过期 (Uint8List)', () async {
      final r = ChunkedStreamReader(() async* {
        yield Uint8List.fromList([1, 2]);
        yield Uint8List.fromList([3, 4, 5]);
        yield Uint8List.fromList([6, 7, 8, 9]);
        yield Uint8List.fromList([10]);
      }());

      expect(await r.readChunk(20));
      expect(await r.readChunk(1));
      expect(await r.readChunk(1));
      await r.cancel();
      expect(await r.readChunk(1));
    });

    test('readChunk() 由2个元素组成的块 (Uint8List)', () async {
      final r = ChunkedStreamReader(() async* {
        yield Uint8List.fromList([1, 2]);
        yield Uint8List.fromList([3, 4, 5]);
        yield Uint8List.fromList([6, 7, 8, 9]);
        yield Uint8List.fromList([10]);
      }());

      expect(await r.readChunk(2));
      expect(await r.readChunk(2));
      expect(await r.readChunk(2));
      expect(await r.readChunk(2));
      expect(await r.readChunk(2));
      expect(await r.readChunk(1));
      expect(await r.readChunk(1));
      await r.cancel();
      expect(await r.readChunk(1));
    });

    test('readChunk() 由3个元素组成的块 (Uint8List)', () async {
      final r = ChunkedStreamReader(() async* {
        yield Uint8List.fromList([1, 2]);
        yield Uint8List.fromList([3, 4, 5]);
        yield Uint8List.fromList([6, 7, 8, 9]);
        yield Uint8List.fromList([10]);
      }());

      expect(await r.readChunk(3));
      expect(await r.readChunk(3));
      expect(await r.readChunk(3));
      expect(await r.readChunk(3));
      expect(await r.readChunk(1));
      expect(await r.readChunk(1));
      await r.cancel();
      expect(await r.readChunk(1));
    });
  }

}